/**
 * FileName: StructuredStreaming
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/03/13 10:12
 * Description: 使用Sql方式读取kafka数据按照process配置进行处理最终存入kafka
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.factory.SourceFactory;
import cn.com.bonc.process.chain.ProcessChain;
import cn.com.bonc.process.impl.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;


public class StructuredStreamingWinCount {

	public static void main(String[] args) {

		SparkSession spark = SparkSession
				.builder()
				.appName("KafkaJoinRedisApp")
				.config("spark.redis.host", ConfigurationManager.getProperty(Constants.REDIS_IP))
				.config("spark.redis.port", ConfigurationManager.getProperty(Constants.REDIS_PORT))
				.getOrCreate();

		Dataset dataset = SourceFactory.create(Constants.KAFKA_SOURCE).getDataset(spark);
		Dataset<Row> result = ProcessChain.setSourceData(dataset)
				.addProcess(new JoinExternalDataProcessImpl(spark))
				.addProcess(new Col2MutiColsProcessImpl())
				.addProcess(new SqlListProcessImpl(spark))
				.addProcess(new WindowCountProcessImpl())
				.addProcess(new MutiCols2ColProcessImpl())
				.execute();


		StreamingQuery query = result.writeStream()
				.outputMode(OutputMode.Update())
				.format("kafka")
				.option("checkpointLocation", "hdfs://192.168.70.21:9000/test_szj/checkpoint")
				.option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
				.option("topic", "savetopic")
				.start();

//		StreamingQuery query = result
//				.writeStream()
//				.format("console")
//				.outputMode(OutputMode.Update())
//				.option("truncate", "false")
//				.start();
		//保持程序运行
		try {
			query.awaitTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}
	}
}